[SPARK-27070] Fix performance bug in DefaultPartitionCoalescer#23986
[SPARK-27070] Fix performance bug in DefaultPartitionCoalescer#23986fitermay wants to merge 4 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
The indent is off here. Returning the result of a subtraction in compare can overflow, though I don't think it can happen in practice here. Still, see below, I think we can just remove this.
When trying to coalesce a UnionRDD of two large FileScanRDDs (each with a few million partitions) into around 8k partitions the driver can stall for over an hour. Profiler shows that over 90% of the time is spent in TimSort which is invoked by `pickBin`. This patch replaces sorting with a more efficient `min` for the purpose of finding the least occupied PartitionGroup
|
@srowen Hi, Thanks for the prompt review. I've amended the code to address your comments. |
|
Test build #4597 has finished for PR 23986 at commit
|
|
@fitermay It would be very nice to see some some rough numbers regarding the improvement caused by this PR. Could you please share us how it behaved before and after for your experiment mentioned in the description? |
|
There's a little detail in the JIRA that's pretty suggestive that this is the bottleneck; if there's a stack trace or more numbers to show, that's great. Regardless I think this a clean 'win', just a question of how big. |
|
@srowen @attilapiros It runs out EMRFS returns the string '*' as the host of each block. This ends up invoking the worst case of this algorithm where it tries to jam everything into the same preferred partition. In turn, this ends up running sort on hundreds thousands of records each iteration to find the minimum. I've contacted the EMR team to suggest changing the host to 'localhost' but apparently that would break MR performance on Yarn. I still think this patch is a win because:
I will try to make the suggested changes and also generate some performance numbers for the extreme case tonight. Thanks! |
|
Benchmark with 100K blocks instead of several million. Number of hosts = 1 is clearly the worst case After patch: |
|
@fitermay @attilapiros From the benchmark it's seems that host=1 is the absolute worst case and this change improves it by a decent margin. It improves the other cases slightly. |
attilapiros
left a comment
There was a problem hiding this comment.
Impressing results.
Thanks for the improvement!
|
By the way. This are the results from the original PR before replacing |
|
Hm! That's surprising. Looking at min vs minBy, it even seems like min has more indirection (calls foldLeft). The implicit still involves calling a function to compare and get num partitions in both cases. If you're pretty sure this is accurate I'm OK returning to the implicit. |
|
Can I check the reason why is this difference on Monday/Tuesday? I mean can we wait with the merge? |
There was a problem hiding this comment.
Hi, @fitermay . Thank you for your first contribution. I saw the above good previous comments, and I left a few comments, too.
After fixing that, we can trigger Jenkins for your PR. Otherwise, it will fail to build.
|
ok to test |
|
Test build #103221 has finished for PR 23986 at commit
|
@srowen Sets up the lambda that's passed into minBy. Notice that the return type of the closure must be The lambda first invokes the below function, whose only job is to box the primitive int Then the actual method that returns |
- Use `min` with Ordering instead of `minBy` to avoid boxing overhead - Add benchmark results - Add benchmarks description - Don't use DebugFilesystem in benchmark - Fix scalastyle - Fix some minor existing codestyle issues in CoalescedRDD.scala
|
@dongjoon-hyun @srowen Pushed these changes:
|
|
Test build #103271 has finished for PR 23986 at commit
|
|
Test build #103275 has finished for PR 23986 at commit
|
|
retest this please |
|
sounds good to me |
|
Test build #103278 has finished for PR 23986 at commit
|
|
@dongjoon-hyun |
|
Merged to master |
| implicit val partitionGroupOrdering: Ordering[PartitionGroup] = | ||
| (o1: PartitionGroup, o2: PartitionGroup) => | ||
| java.lang.Integer.compare(o1.numPartitions, o2.numPartitions) | ||
|
|
There was a problem hiding this comment.
Hi, All.
This seems to break scala-2.11 build.
[error] ../core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala:161: type mismatch;
[error] found : (org.apache.spark.rdd.PartitionGroup, org.apache.spark.rdd.PartitionGroup) => Int
[error] required: Ordering[org.apache.spark.rdd.PartitionGroup]
[error] (o1: PartitionGroup, o2: PartitionGroup) =>
[error]
There was a problem hiding this comment.
Thanks. That’s unfortunate. I’ll fix it later tonight
There was a problem hiding this comment.
Then, let me revert this first to recover 2.11 build for the other PRs. Since this PR is already approved, I believe that the next PR will be easily accepted, @fitermay .
There was a problem hiding this comment.
@dongjoon-hyun @srowen: Would it be a good idea to extend the PR builder to run a compile with scala 2.11 (without any test run)?
I know it is an extra 10-15 minutes but for the 4 hours test run it might be worth preventing such situations on the other hand this must be very rare. What is your opinion?
There was a problem hiding this comment.
I agree with you @attilapiros . But, IIRC, there was a discussion on that issue and the decision at that time was the current cost is not high enough for that.
The committers have a responsibility to monitor their commit. And, we usually are able to do HOTFIX or revert in a short time.
There was a problem hiding this comment.
Ok, thanks.
Yes it must be very rare.
There was a problem hiding this comment.
We're going to drop 2.11 support soonish anyway, so I think for now we accept the occasional breaks and fix after the fact rather than double the PR builders.
|
This is reverted via 4bab69b . |
|
Scala-2.11 build is recovered and now testing is on the way. |
|
@fitermay I guess it has to be more explicitly constructed as an |
When trying to coalesce a UnionRDD of two large FileScanRDDs
(each with a few million partitions) into around 8k partitions
the driver can stall for over an hour.
Profiler shows that over 90% of the time is spent in TimSort
which is invoked by
pickBin. This patch replaces sorting with a moreefficient
minfor the purpose of finding the least occupiedPartitionGroup